Defer task spawning in SortPreservingMergeExec to first poll#21328
Defer task spawning in SortPreservingMergeExec to first poll#21328Dandandan wants to merge 4 commits intoapache:mainfrom
Conversation
Previously, SortPreservingMergeExec eagerly executed all input partitions and spawned buffered tasks in execute(). This meant that even if the output stream was never polled, all tasks would be spawned. This changes the multi-partition path to use a lazy stream that defers spawning and building the streaming merge until first poll_next(). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lazy-sort-preserving-merge (6eff550) to 4084a18 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lazy-sort-preserving-merge (6eff550) to 4084a18 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lazy-sort-preserving-merge (6eff550) to 4084a18 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lazy-sort-preserving-merge (6eff550) to 4084a18 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lazy-sort-preserving-merge (6eff550) to 4084a18 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lazy-sort-preserving-merge (6eff550) to 4084a18 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmark tpcds |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lazy-sort-preserving-merge (6eff550) to 4084a18 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
cc @neilconway |
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Do you expect any performance change due to this PR? |
Which issue does this PR close?
Rationale for this change
I would like for DataFusion to be more explicit about parallelism / concurrency.
SortPreservingMergeExec is one of a few nodes that executes the stream eagerly, i.e. it spawns buffered tasks immediately, even before any batch is polled from the output stream.
This makes it difficult to control/limit pipeline parallelism and reduce (memory) resources.
Also it makes short-circuiting of complex plans less useful as it might already have executed a large part of the plan.
What changes are included in this PR?
Introduces a
LazySortPreservingMergeStreamthat defers executing input partitions, spawning buffered tasks, and building the streaming merge until the firstpoll_next()call. The single-partition and zero-partition paths are unchanged.In practice, this has no big effect, as
SortPreservingMergeExecis usually on the root of queries (in plans a partitionSortExecis more common as we usually don't need/avoid global order inside plans)How are these changes tested?
All existing tests pass (17/17).
🤖 Generated with Claude Code